package ca.wilkinsonlab.sadi.share;

import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.time.StopWatch;
import org.apache.log4j.Logger;

import ca.wilkinsonlab.sadi.SADIException;
import ca.wilkinsonlab.sadi.client.Config;
import ca.wilkinsonlab.sadi.client.MultiRegistry;
import ca.wilkinsonlab.sadi.client.Service;
import ca.wilkinsonlab.sadi.client.ServiceInputPair;
import ca.wilkinsonlab.sadi.client.ServiceInvocationException;
import ca.wilkinsonlab.sadi.client.virtual.sparql.SPARQLServiceWrapper;
import ca.wilkinsonlab.sadi.decompose.RestrictionAdapter;
import ca.wilkinsonlab.sadi.decompose.VisitingDecomposer;
import ca.wilkinsonlab.sadi.stats.PredicateStatsDB;
import ca.wilkinsonlab.sadi.utils.LabelUtils;
import ca.wilkinsonlab.sadi.utils.OwlUtils;
import ca.wilkinsonlab.sadi.utils.PropertyResolvabilityCache;
import ca.wilkinsonlab.sadi.utils.RdfUtils;
import ca.wilkinsonlab.sadi.utils.ResourceTyper;
import ca.wilkinsonlab.sadi.utils.VariableNameFactory;

import com.hp.hpl.jena.graph.Node;
import com.hp.hpl.jena.graph.Triple;
import com.hp.hpl.jena.ontology.Individual;
import com.hp.hpl.jena.ontology.OntClass;
import com.hp.hpl.jena.ontology.OntModel;
import com.hp.hpl.jena.ontology.OntModelSpec;
import com.hp.hpl.jena.ontology.OntProperty;
import com.hp.hpl.jena.ontology.OntResource;
import com.hp.hpl.jena.ontology.Restriction;
import com.hp.hpl.jena.query.Query;
import com.hp.hpl.jena.query.QueryFactory;
import com.hp.hpl.jena.query.Syntax;
import com.hp.hpl.jena.rdf.model.Model;
import com.hp.hpl.jena.rdf.model.ModelFactory;
import com.hp.hpl.jena.rdf.model.Property;
import com.hp.hpl.jena.rdf.model.RDFNode;
import com.hp.hpl.jena.rdf.model.Resource;
import com.hp.hpl.jena.rdf.model.Statement;
import com.hp.hpl.jena.sparql.core.TriplePath;
import com.hp.hpl.jena.sparql.syntax.ElementPathBlock;
import com.hp.hpl.jena.sparql.syntax.ElementTriplesBlock;
import com.hp.hpl.jena.sparql.syntax.ElementVisitorBase;
import com.hp.hpl.jena.sparql.syntax.ElementWalker;
import com.hp.hpl.jena.vocabulary.OWL;
import com.hp.hpl.jena.vocabulary.RDF;
import com.hp.hpl.jena.vocabulary.RDFS;

public class SHAREKnowledgeBase
{
	private static final Logger log = Logger.getLogger( SHAREKnowledgeBase.class );

	private static final String AUTOGENERATED_INVERSE_PROPERTY_SUFFIX = "-inverse";

	private MultiRegistry registry;

	private PredicateStatsDB statsDB;

	private OntModel reasoningModel;
	private Model dataModel;

	private Map<String, PotentialValues> variableBindings;
	private Map<Node, Collection<Triple>> constraints;

	private Tracker tracker;
	private Set<String> deadServices;

	public static final OntModelSpec DEFAULT_REASONER = OntModelSpec.OWL_MEM_MICRO_RULE_INF;
        public static final String DEFAULT_REASONER_STRING = "com.hp.hpl.jena.ontology.OntModelSpec.OWL_MEM_MICRO_RULE_INF";

	public static final String ROOT_CONFIG_KEY = "share";
	public static final String REASONER_SPEC_CONFIG_KEY = "reasoner";
	public static final String ALLOW_ARQ_SYNTAX_CONFIG_KEY = "allowARQSyntax";
	public static final String ALLOW_PREDICATE_VARIABLES_CONFIG_KEY = "allowPredicateVariables";
	public static final String USE_ADAPTIVE_QUERY_PLANNING_CONFIG_KEY = "useAdaptiveQueryPlanning";
	public static final String RECORD_QUERY_STATS_CONFIG_KEY = "recordQueryStats";
	public static final String INTERSECT_VARIABLE_BINDINGS_CONFIG_KEY = "intersectVariableBindings";
	public static final String RESOLVE_UNBOUND_PATTERNS_CONFIG_KEY = "resolveUnboundPatterns";
	public static final String STORE_INFERRED_TRIPLES_CONFIG_KEY = "storeInferredTriples";
	public static final String DYNAMIC_INPUT_CLASSIFICATION_CONFIG_KEY = "dynamicInputInstanceClassification";

	/** allow ARQ-specific extensions to SPARQL query syntax (e.g. GROUP BY, HAVING, arithmetic expressions) */
	private boolean allowARQSyntax;
	/** allow variables in the predicate positions of triple patterns */
	private boolean allowPredicateVariables;
	/** decide ordering of query patterns as the query runs */
	private boolean useAdaptiveQueryPlanning;
	/** record statistics during query execution */
	private boolean recordQueryStats;

	/**
	 * Compute intersections of bindings, if variables are encountered more than once
	 * in a query. In many cases this will speed up query evaluation significantly.
	 *
	 * NOTE: Queries with OPTIONAL clauses will not be resolved correctly when
	 * this option is on, and so it turned is off by default.
	 */
	private boolean intersectVariableBindings;

	/**
	 * Attempt to resolve patterns where both the subject and object are unbound,
	 * by: 1) finding a set of relevant services by predicate, and 2) discovering
	 * instances of the service input classes in reasoningModel.  This option
	 * is on by default.
	 */
	private boolean resolveUnboundPatterns;

	/**
	 * When adding service output to the local Model, explicitly record
	 * inferred triples due to equivalent properties and inverse properties.
	 * This option does not yet store inferences from subproperty relationships.
	 * It is pointless and detrimental to performance to turn this option
	 * on when a reasoningModel with inferencing is being used.  The option
	 * is off by default.
	 */
	private boolean storeInferredTriples;

	// TODO rename to something less unwieldy?
	private boolean dynamicInputInstanceClassification;

	public SHAREKnowledgeBase()
	{
		this(ModelFactory.createOntologyModel(DEFAULT_REASONER), false);
	}

	public SHAREKnowledgeBase(boolean allowARQSyntax) {
		this(ModelFactory.createOntologyModel(DEFAULT_REASONER), allowARQSyntax);
	}

	public SHAREKnowledgeBase(OntModel reasoningModel) {
		this(reasoningModel, false);
	}

	public SHAREKnowledgeBase(OntModel reasoningModel, boolean allowARQSyntax)
	{
		this(reasoningModel, ModelFactory.createDefaultModel(), allowARQSyntax);
	}

	public SHAREKnowledgeBase(Configuration config)
	{
	    this(ModelFactory.createOntologyModel(getReasonerSpec(config.getString(REASONER_SPEC_CONFIG_KEY, DEFAULT_REASONER_STRING))),
	            ModelFactory.createDefaultModel(), config.getBoolean(ALLOW_ARQ_SYNTAX_CONFIG_KEY, false));
	}

	public SHAREKnowledgeBase(OntModel reasoningModel, Model dataModel, boolean allowARQSyntax)
	{
		log.debug("new ca.wilkinsonlab.sadi.share.DynamicKnowledgeBase instantiated");

		setAllowARQSyntax(allowARQSyntax);

		this.registry = Config.getConfiguration().getMasterRegistry();

		this.reasoningModel = reasoningModel;
		this.dataModel = dataModel;
		if (!reasoningModel.listSubModels().toSet().contains(dataModel)) {
			log.debug("adding data-only model as a sub-model of reasoning model");
			reasoningModel.addSubModel(dataModel);
		}

		variableBindings = new HashMap<String, PotentialValues>();
		constraints = new HashMap<Node, Collection<Triple>>();

		tracker = new Tracker();

		Configuration config = ca.wilkinsonlab.sadi.share.Config.getConfiguration();

		deadServices = Collections.synchronizedSet(new HashSet<String>());
		for (Object serviceUri: config.getList("share.deadService"))
			deadServices.add((String)serviceUri);

		try {
			this.statsDB = new PredicateStatsDB(config.subset(PredicateStatsDB.ROOT_CONFIG_KEY));
		} catch(IOException e) {
			log.error("unable to initialize predicate stats db", e);
		}

		Configuration kbConfig = config.subset(ROOT_CONFIG_KEY);
		dynamicInputInstanceClassification = kbConfig.getBoolean(DYNAMIC_INPUT_CLASSIFICATION_CONFIG_KEY, false);
		setUseAdaptiveQueryPlanning(kbConfig.getBoolean(USE_ADAPTIVE_QUERY_PLANNING_CONFIG_KEY, false));
		setAllowPredicateVariables(kbConfig.getBoolean(ALLOW_PREDICATE_VARIABLES_CONFIG_KEY, false));
		setRecordQueryStats(kbConfig.getBoolean(RECORD_QUERY_STATS_CONFIG_KEY, false));
		/*
		 * NOTE: default value should be false here, OPTIONAL queries
		 * will not work when this option is on (see note above)
		 */
		this.intersectVariableBindings = kbConfig.getBoolean(INTERSECT_VARIABLE_BINDINGS_CONFIG_KEY, false);
//		skipPropertiesPresentInKB = kbConfig.getBoolean("skipPropertiesPresentInKB", false);

		this.resolveUnboundPatterns = kbConfig.getBoolean(RESOLVE_UNBOUND_PATTERNS_CONFIG_KEY, true);
		this.storeInferredTriples = kbConfig.getBoolean(STORE_INFERRED_TRIPLES_CONFIG_KEY, false);
	}

	protected void setAllowARQSyntax(boolean allowARQSyntax) {
		this.allowARQSyntax = allowARQSyntax;
	}

	public boolean allowARQSyntax() {
		return this.allowARQSyntax;
	}

	public Syntax getQuerySyntax() {
		return (allowARQSyntax() ? Syntax.syntaxARQ : Syntax.syntaxSPARQL);
	}

	protected void setAllowPredicateVariables(boolean allowPredicateVariables) {
		this.allowPredicateVariables = allowPredicateVariables;
	}

	public void setUseAdaptiveQueryPlanning(boolean useAdaptiveQueryPlanning) {
		this.useAdaptiveQueryPlanning = useAdaptiveQueryPlanning;
	}

	public boolean useAdaptiveQueryPlanning() {
		return this.useAdaptiveQueryPlanning;
	}

	public void setRecordQueryStats(boolean recordQueryStats) {
		this.recordQueryStats = recordQueryStats;
	}

	public boolean recordQueryStats() {
		return this.recordQueryStats;
	}

        protected static OntModelSpec getReasonerSpec(String specString)
        {
            try {
                return OwlUtils.getReasonerSpec(specString);
            } catch (SADIException e) {
                log.error(e.getMessage(), e);
                return DEFAULT_REASONER;
            }
        }

	protected MultiRegistry getRegistry()
	{
		return registry;
	}

	protected PredicateStatsDB getStatsDB()
	{
		return this.statsDB;
	}

	public void dispose()
	{
		/* this is not working because (unbelievably) it closes the root OWL ontology model...
		 * clearly Jena is not imagining a world where multiple reasoners coexist in the
		 * same VM long-term...
		 */
//		/* before we close our reasoning model, we have to remove all sub-
//		 * models, or they'll be closed too (this is mostly a problem because
//		 * it includes imported models, which are reused by Jena later...)
//		 * (remove with no rebind so that it's quicker, since we're about to
//		 * close this reasoning model anyway...)
//		 */
//		for (OntModel subModel: reasoningModel.listSubModels().toList()) {
//			reasoningModel.removeSubModel(subModel);
//		}
//		reasoningModel.close();

		dataModel.close();

		/* this shouldn't actually be necessary, as closing the models should
		 * free up the resources associated with the RDFNodes in the variable
		 * bindings...
		 */
		variableBindings.clear();
	}

	public Model getDataModel()
	{
		return dataModel;
	}

	public OntModel getReasoningModel()
	{
		return reasoningModel;
	}

	public void executeQuery(String query)
	{
		executeQuery(QueryFactory.create(query, getQuerySyntax()));
	}

	public void executeQuery(String query, QueryPatternOrderingStrategy strategy)
	{
		executeQuery(QueryFactory.create(query, getQuerySyntax()), strategy);
	}

	public void executeQuery(Query query)
	{
		if (useAdaptiveQueryPlanning()) {
			executeQueryAdaptive(query);
		} else {
			executeQuery(query, new DefaultQueryPatternOrderingStrategy());
		}
	}

	public void executeQuery(Query query, QueryPatternOrderingStrategy strategy)
	{
		loadFromClauses(query);

		List<Triple> queryPatterns = new QueryPatternEnumerator(query).getQueryPatterns();
		try {
			queryPatterns = strategy.orderPatterns(queryPatterns);
		} catch (UnresolvableQueryException e) {
			log.error(String.format("failed to order query %s with strategy %s", query, strategy), e);
		}

		computeConstraints(queryPatterns);

		for (Triple pattern: queryPatterns) {
			processPattern(pattern);
		}
	}

	protected void executeQueryAdaptive(Query query)
	{
		loadFromClauses(query);

		log.trace("running query with adaptive query planning");

		List<Triple> queryPatterns = new QueryPatternEnumerator(query).getQueryPatterns();
		Set<Triple> visitedPatterns = new HashSet<Triple>();
		QueryPatternComparator comparator = new QueryPatternComparator();

		computeConstraints(queryPatterns);

		while (visitedPatterns.size() < queryPatterns.size()) {

			StopWatch stopWatch = new StopWatch();
			stopWatch.start();

			Triple bestPattern = null;
			for (Triple pattern : queryPatterns) {
				if (visitedPatterns.contains(pattern)) {
					continue;
				}
				if (bestPattern == null || comparator.compare(pattern, bestPattern) < 0) {
					bestPattern = pattern;
				}
			}

			stopWatch.stop();
			log.trace(String.format("optimizer selected next pattern for resolution in %dms", stopWatch.getTime()));

			processPattern(bestPattern);
			visitedPatterns.add(bestPattern);
		}
	}

	protected void loadFromClauses(Query query)
	{
		/* load all of the graphs referenced in the FROM clause into the kb
		 * TODO might have to make this configurable or turn it off, if people
		 * are using the FROM clause in other ways...
		 */
		for (String sourceURI: query.getGraphURIs()) {
			try {
				URL url = new URL(sourceURI);
				if (url.getFile().endsWith(".owl")) {
					log.info(String.format("reading %s into ontology model", sourceURI));
					reasoningModel.read( sourceURI );
				} else {
					log.info(String.format("reading %s into data model", sourceURI));
					String rdfFormat = "RDF/XML";
					String file = url.getFile();
					if (file.endsWith(".ttl") || file.endsWith(".n3") || file.endsWith(".turtle"))
						rdfFormat = "N3";
					dataModel.read(sourceURI, rdfFormat);
				}
			} catch (Exception e) {
				log.error(String.format("failed to read FROM graph %s", sourceURI), e);
			}
		}
	}

	protected Collection<Triple> getConstraints(Node variable)
	{
		if (!constraints.containsKey(variable)) {
			Collection<Triple> variableConstraints = new ArrayList<Triple>();
			constraints.put(variable, variableConstraints);
		}
		return constraints.get(variable);
	}

	protected void computeConstraints(List<Triple> queryPatterns)
	{
		for (Triple pattern: queryPatterns) {
			Node subject = pattern.getSubject();
			if (!subject.isVariable())
				continue;
			if (!pattern.getObject().isConcrete())
				continue;

			Collection<Triple> variableConstraints = getConstraints(subject);
			variableConstraints.add(pattern);
		}
	}

	private enum ResolutionDirection { FORWARD, REVERSE }

	private void processPattern(Triple pattern)
	{
		log.trace(String.format("query pattern %s", pattern));

		StopWatch stopWatch = new StopWatch();
		stopWatch.start();

		PotentialValues subjects = expandQueryNode(pattern.getSubject());
		PotentialValues predicates = expandQueryNode(pattern.getPredicate());
		PotentialValues objects = expandQueryNode(pattern.getObject());

		if (!allowPredicateVariables && predicates.isEmpty()) {
			log.error("variables are not permitted in the predicate position of triple patterns");
			return;
		}

		Set<OntProperty> properties = getOntProperties(RdfUtils.extractResources(predicates.values));

		/* if the predicate position has bindings, but none of them are URIs,
		 * then this pattern has no solutions.
		 */
		if (!predicates.isEmpty() && properties.size() == 0) {
			log.trace(String.format("pattern has no solution, all bindings for predicate var %s are non-URI values", predicates.key));
			return;
		}

		for (ResolutionDirection direction: computeResolutionDirectionPreference(pattern, subjects, objects)) {
			boolean directionIsForward = direction.equals(ResolutionDirection.FORWARD);
			log.trace(String.format("resolving pattern %s in %s direction", pattern, directionIsForward ? "forward" : "reverse"));
			boolean results = gatherTriples(pattern, subjects, properties, objects, directionIsForward);
			if (results) {
				if (recordQueryStats()) {
					recordStats(subjects, predicates, objects, directionIsForward, (int)stopWatch.getTime());
				}
				break;
			}
		}

		/* note: this must come after normal processing of the triple pattern,
		 * so that rdf:type patterns are also resolved against SPARQL endpoints.
		 * --BV
		 */
		Node p = pattern.getPredicate();
		if (p.isURI() && p.getURI().equals(RDF.type.getURI())) {
			processTypePattern(pattern);
		}

		if (this.intersectVariableBindings) {
			populateVariableBindingWithIntersection(subjects, predicates, objects);
		} else {
			populateVariableBinding(subjects, predicates, objects);
		}

		stopWatch.stop();
		log.trace(String.format("resolved pattern %s in %dms", pattern, stopWatch.getTime()));
	}

	private List<ResolutionDirection> computeResolutionDirectionPreference(Triple pattern, PotentialValues subjects, PotentialValues objects)
	{
		List<ResolutionDirection> resolutionPreference = new ArrayList<ResolutionDirection>(ResolutionDirection.values().length);
		if (!subjects.isEmpty()) { // bound subject...
			if (!objects.isEmpty()) { // bound subject and object...
				if (useAdaptiveQueryPlanning()) {
					if ((new QueryPatternComparator()).bestDirectionIsForward(pattern)) {
						resolutionPreference.add(ResolutionDirection.FORWARD);
						resolutionPreference.add(ResolutionDirection.REVERSE);
					} else {
						resolutionPreference.add(ResolutionDirection.REVERSE);
						resolutionPreference.add(ResolutionDirection.FORWARD);
					}
				} else {
					resolutionPreference.add(ResolutionDirection.FORWARD);
					resolutionPreference.add(ResolutionDirection.REVERSE);
				}
			} else { // bound subject, unbound object...
				resolutionPreference.add(ResolutionDirection.FORWARD);
				resolutionPreference.add(ResolutionDirection.REVERSE);
			}
		} else if (!objects.isEmpty()) { // unbound subject, bound object...
			resolutionPreference.add(ResolutionDirection.REVERSE);
			resolutionPreference.add(ResolutionDirection.FORWARD);
		} else { // unbound subject, unbound object...
			resolutionPreference.add(ResolutionDirection.FORWARD);
			resolutionPreference.add(ResolutionDirection.REVERSE);
		}
		return resolutionPreference;
	}

	private boolean gatherTriples(Triple pattern, PotentialValues subjects, Set<OntProperty> properties, PotentialValues objects, boolean directionIsForward)
	{
		if (!directionIsForward) {
			PotentialValues tmp = subjects;
			subjects = objects;
			objects = tmp;
			properties = getInverseProperties(properties);
		}
		if (subjects.isEmpty()) {
			if (resolveUnboundPatterns) {
				findSubjects(subjects, properties, objects);
			} else {
				log.warn(String.format("skipping query pattern %s because of unbound variable", pattern));
				return false;
			}
		}
		return gatherTriples(subjects, properties, objects);
	}

	/* try to find subjects by looking for instances of input
	 * classes for services that generate the required property...
	 */
	private Collection<ServiceInputPair> findSubjects(PotentialValues subjects, Collection<OntProperty> properties, PotentialValues objects)
	{
		log.debug(String.format("dynamically discovering potential subjects for %s", properties));
		Collection<ServiceInputPair> results = new ArrayList<ServiceInputPair>();
		Collection<Service> services = getServicesByPredicate(properties, getConstraints(objects.variable));
		for (Service service: services) {
			Collection<Resource> inputInstances;
			try {
				inputInstances = service.discoverInputInstances(reasoningModel);
				log.trace(String.format("found service %s with %d inputs", service, inputInstances.size()));
				subjects.values.addAll(inputInstances);
				results.add(new ServiceInputPair(service, inputInstances));
			} catch (SADIException e) {
				log.error("error discovering input instances" , e);
			}
		}
		return results;
	}

	protected Set<OntProperty> getOntProperties(Collection<Resource> predicates)
	{
		log.trace(String.format("expanding properties %s", predicates));
		Set<OntProperty> predicateSet = new HashSet<OntProperty>();
		for (Resource predicate : predicates) {
			if (predicate.isURIResource())
				predicateSet.add(getOntProperty(predicate.getURI()));
			else
				log.warn(String.format("skipping anonymous predicate %s", predicate));
		}
		return predicateSet;
	}

	protected Set<OntProperty> getInverseProperties(Collection<OntProperty> properties)
	{
		Set<OntProperty> inverseProperties = new HashSet<OntProperty>();
		for (OntProperty property : properties) {
			OntProperty inverseProperty = getInverseProperty(property);
			if (inverseProperty != null)
				inverseProperties.add(inverseProperty);
		}
		return inverseProperties;
	}


	/* this now expands a class definition into the triple patterns
	 * that describe it, so we should be able to use it to convert from
	 * a class description to a SPARQL query...
	 */
	private void processTypePattern(final Triple pattern)
	{
		final OntClass c = getNodeAsClass(pattern.getObject());
		if (c == null) {
			log.warn(String.format("skipping type pattern with unknown class  %s", pattern.getObject()));
			return;
		}

		final PotentialValues subjects = expandQueryNode(pattern.getSubject());
		log.debug(String.format("gathering triples to find instances of %s", resourceToString(c)));

		new VisitingDecomposer(new ca.wilkinsonlab.sadi.decompose.DefaultClassTracker() {
			@Override
			public boolean seen(OntClass c)
			{
				return tracker.beenThere(subjects, c);
			}
		}, new ca.wilkinsonlab.sadi.decompose.DefaultClassVisitor() {
			@Override
			public boolean ignore(OntClass c)
			{
				if (super.ignore(c)) {
					log.trace(String.format("ignoring class %s by default", resourceToString(c)));
					return true;
				}

				if (subjects.isEmpty()) {
					List<? extends OntResource> instances = c.listInstances().toList();
					if (!instances.isEmpty()) {
						log.trace(String.format("ignoring class %s because there are instances present that will be assigned to %s", resourceToString(c), subjects.variable));
						subjects.values.addAll(instances);
						return true;
					} else{
						log.trace(String.format("decomposing class %s because there are no instances in the KB", resourceToString(c)));
						return false;
					}
				}

				for (Resource resource: RdfUtils.extractResources(subjects.values)) {
					// can we just cast this instead?
					Individual i = null;
					try {
						i = (Individual)resource;
					} catch (ClassCastException e) {
						if (resource.isURIResource())
							i = reasoningModel.getIndividual(resource.getURI());
						else
							log.warn(String.format("couldn't cast anonymous resource %s as Invidual", resource));
					}

					if (i != null && i.hasOntClass(c)) {
						log.trace(String.format("ignoring class %s because candidates of %s are instances", resourceToString(c), subjects.variable));
						return true;
					}
				}
				log.trace(String.format("decomposing class %s because no candidates of %s are instances", resourceToString(c), subjects.variable));
				return false;
			}
		}, new RestrictionAdapter() {
			public void onProperty(OntProperty onProperty)
			{
				log.trace(String.format("%s has restriction on property %s", resourceToString(c), resourceToString(onProperty)));
				PotentialValues objects = getNewVariableBinding(onProperty);
				Triple newPattern = Triple.create(pattern.getSubject(), onProperty.asNode(), objects.variable);
				log.trace(String.format("created new pattern %s", newPattern));
				processPattern(newPattern);
			}
			public void hasValue(OntProperty onProperty, RDFNode hasValue)
			{
				log.trace(String.format("%s has resriction on property %s: hasValue %s", resourceToString(c), resourceToString(onProperty), hasValue));
				Triple newPattern = Triple.create(pattern.getSubject(), onProperty.asNode(), hasValue.asNode());
				getConstraints(pattern.getSubject()).add(newPattern);
				log.trace(String.format("created new pattern %s", newPattern));
				processPattern(newPattern);
			}
			public void valuesFrom(OntProperty onProperty, OntResource valuesFrom)
			{
				log.trace(String.format("%s has resriction on property %s: valuesFrom %s", resourceToString(c), resourceToString(onProperty), resourceToString(valuesFrom)));
				PotentialValues objects = getNewVariableBinding(onProperty);
				Triple newPattern = Triple.create(pattern.getSubject(), onProperty.asNode(), objects.variable);
				getConstraints(pattern.getSubject()).add(newPattern);
				log.trace(String.format("created new pattern %s", newPattern));
				Triple typePattern = Triple.create(objects.variable, RDF.type.asNode(), valuesFrom.asNode());
				getConstraints(objects.variable).add(typePattern);
				log.trace(String.format("created new pattern %s", typePattern));
				processPattern(newPattern);
				processPattern(typePattern);
			}
		}).decompose(c);
	}

	private String resourceToString(OntResource resource)
	{
		StringBuilder buf = new StringBuilder();
		if (resource.isURIResource())
			buf.append(resource.getURI());
		else
			buf.append(resource.getId().toString());
		if (resource.hasProperty(RDFS.label)) {
			buf.append(" (");
			for (Statement stmt: resource.listProperties(RDFS.label).toList()) {
				// this should never happen, but it does...
				if (!stmt.getObject().isLiteral())
					continue;
				buf.append(" '");
				buf.append(stmt.getString());
				buf.append("'");
			}
			buf.append(" ) ");
		}
		return buf.toString();
	}

	private OntClass getNodeAsClass(Node node)
	{
		if (node.isVariable())
			return null;

		/* if the node already represents an OntClass, just return it;
		 * this happens when an already-loaded OntClass is decomposed and
		 * the triple patterns we added are processed (these classes
		 * will frequently be anonymous, so we want to deal with them
		 * before we start using the URI...)
		 */
		RDFNode rdfNode = reasoningModel.getRDFNode(node);
		if (rdfNode != null && rdfNode.canAs(OntClass.class))
			return rdfNode.as(OntClass.class);

		/* if the node isn't already an OntClass but does have a URI,
		 * try to resolve that URI...
		 */
		if (node.isURI())
			return getOntClass(node.getURI());

		/* if we make it this far, there's nothing we can do;
		 * we probably have an anonymous node that isn't attached to
		 * a class description (I don't know how this could happen...)
		 */
		return null;
	}

	/* return a set of potential values this query node represents;
	 * this will be empty if the node is an unbound variable.
	 */
	private PotentialValues expandQueryNode(Node queryPatternNode)
	{
		if (queryPatternNode.isVariable()) {
			return getVariableBinding(queryPatternNode);
		} else {
			return new PotentialValues(dataModel.getRDFNode(queryPatternNode));
		}
	}

	private PotentialValues getVariableBinding(Node variable)
	{
		if (!variableBindings.containsKey(variable.getName())) {
			log.trace(String.format("first time encountering variable %s", variable));
			variableBindings.put(variable.getName(), new PotentialValues(variable));
		}
		return variableBindings.get(variable.getName());
	}

	private PotentialValues getNewVariableBinding(Resource r)
	{
		String varName = VariableNameFactory.getVariableName(r);
		while (variableBindings.containsKey(varName)) {
			varName = VariableNameFactory.getNextVariableName(varName);
		} while (variableBindings.containsKey(varName));

		return getVariableBinding(Node.createVariable(varName));
	}

	/* look up the specified URI in the reasoning model; if it's not there,
	 * try to resolve the URI into the model; if that doesn't work, just
	 * create it.
	 */
	private OntClass getOntClass(String uri)
	{
		OntClass c = null;
		try {
			c = OwlUtils.getOntClassWithLoad(reasoningModel, uri);
		} catch (SADIException e) {
			log.error(String.format("failed to load OWL class %s", uri), e);
		}

		if (c != null) {
			return c;
		} else {
			log.warn(String.format("creating undefined and unresolvable class %s", uri));
			return reasoningModel.createClass(uri);
		}
	}

	/* look up the specified URI in the reasoning model; if it's not there,
	 * try to resolve the URI into the model; if that doesn't work, just
	 * create it.
	 * TODO maybe we should create a wrapped OntModel that just does this;
	 * it might be worthwhile given the number of different places we have
	 * to implement this behaviour...
	 */
	protected OntProperty getOntProperty(String uri)
	{
		OntProperty p = null;
		try {
			p = OwlUtils.getOntPropertyWithLoad(reasoningModel, uri);
		} catch (SADIException e) {
			log.error(String.format("failed to load OWL property %s", uri), e);
		}

		if (p != null) {
			return p;
		} else {
			log.warn(String.format("creating undefined and unresolvable property %s", uri));
			return reasoningModel.createOntProperty(uri);
		}
	}

	protected OntProperty getInverseProperty(OntProperty p)
	{
		OntProperty inverse = p.getInverse();
		if (inverse == null) {
			log.warn(String.format("creating inverse property of %s", p.getURI()));
			inverse = reasoningModel.createOntProperty(p.getURI() + AUTOGENERATED_INVERSE_PROPERTY_SUFFIX);
			inverse.addInverseOf(p);
			p.addInverseOf(inverse);
		}
		return inverse;
	}

	private void attachTypes(Set<RDFNode> nodes)
	{
		for (RDFNode node: nodes) {
			if (node.isResource()) {
				attachType(node.as(Resource.class));
			}
		}
	}

	private void attachType(Resource resource)
	{
		/* NOTE: Resources from OntModels with reasoning (such as reasoningModel) may
		 * infer rdf:types such as rdf:Resource and owl:Thing for every resource,
		 * so this check for an existing rdf:type is not safe. (See Bug 14 in Bugzilla.) --BV
		 */
		//if ( !RdfUtils.isTyped(resource) ) {
		//log.trace(String.format("attaching type to untyped node %s", resource));
		ResourceTyper.getResourceTyper().attachType(resource);
		//}
	}

	/* gather triples matching a predicate, optionally storing the subjects in an accumulator.
	 */
	private boolean gatherTriples(PotentialValues subjects, Collection<OntProperty> predicates, PotentialValues objects)
	{
		log.debug(String.format("gathering triples with predicates %s", predicates));
		log.trace(String.format("potential subjects %s", subjects));

		if (subjects == null) {
			log.warn("attempt to call gatherTriplesByPredicate with null subject");
			return false;
		} else if (subjects.isEmpty()) {
			log.trace("nothing to do");
			return false;
		}

		/* attach types to untyped nodes where we can infer the type from the
		 * URI...
		 */
		resolveUnknownSubjects(subjects.values);
		attachTypes(subjects.values);

		Collection<Service> services;
		if (predicates.size() == 0) {
			log.trace("predicate is unbound variable, testing inputs against *all* registered services");
			services = getRegistry().getAllServices();
		} else {
			log.trace(String.format("finding services by predicate(s) %s", predicates));
			services = getServicesByPredicate(predicates, getConstraints(objects.variable));
		}

		log.debug(String.format("found %d service%s total", services.size(), services.size() == 1 ? "" : "s"));

		boolean retrievedData = false;

		for (Service service : services) {
			/* copy the collection of potential inputs as they'll be filtered
			 * for each service...
			 */
			Set<RDFNode> inputs = new HashSet<RDFNode>(subjects.values);
			Model output = maybeCallService(service, inputs);
//			if (!output.isEmpty())
//				skipInputClasses.clear();

			/* add the service output to the data model */
			for (Triple triple: RdfUtils.modelToTriples(output)) {
				if (this.storeInferredTriples) {
					addTripleWithInferences(triple);
				} else {
					log.trace(String.format("adding triple to data model %s", triple));
					RdfUtils.addTripleToModel(dataModel, triple);
				}
				if (!retrievedData && !isOutputClassTypeTriple(triple, service)) {
					retrievedData = true;
				}
			}
			reasoningModel.rebind();

			/* load minimal ontologies for any undefined properties
			 * that appear in the output data
			 */
			for (Triple triple: RdfUtils.modelToTriples(output)) {
				if (triple.getPredicate().isURI()) {
					getOntProperty(triple.getPredicate().getURI());
				}
			}
		}

		return retrievedData;
	}

	private void resolveUnknownSubjects(Set<RDFNode> subjects)
	{
		for (Resource subject: RdfUtils.extractResources(subjects)) {
			if (subject.isURIResource() && !dataModel.contains(subject, (Property)null, (RDFNode)null)) {
				log.info(String.format("resolving unknown subject %s", subject));
				try {
					Model model = ModelFactory.createDefaultModel();
					model.read(subject.getURI());
					OwlUtils.extractMinimalOntology(dataModel, model, subject.getURI());
					model.close();
				} catch (Exception e) {
					log.error(String.format("error resolving %s: %s", subject, e.toString()), e);
				}
			}
		}
	}

	private boolean isOutputClassTypeTriple(Triple triple, Service service)
	{
		return RDF.type.getURI().equals(triple.getPredicate().getURI()) &&
		       triple.getObject().hasURI(service.getOutputClassURI());
	}

	protected boolean violatesConstraints(Service service, OntProperty p, Collection<Triple> variableConstraints)
	{
		/* TODO replace this whole bit with Service.listRestrictions() or Service.listRestrictionsOnProperty()
		 * once those are in the general service contract...
		 */
		OntClass inputClass = null;
		OntClass outputClass = null;
		try {
//			String inputClassURI = service.getInputClassURI();
//			if (inputClassURI != null)
//				inputClass = getOntClass(inputClassURI);
//			String outputClassURI = service.getOutputClassURI();
//			if (outputClassURI != null)
//				outputClass = getOntClass(outputClassURI);
			inputClass = service.getInputClass();
			outputClass = service.getOutputClass();
		} catch (SADIException e) {
			log.error("error getting input/output class", e);
		}
		if (inputClass == null || inputClass.equals(OWL.Nothing)) {
			log.trace("no output class; no constraints to violate");
			return false;
		}
		if (outputClass == null || outputClass.equals(OWL.Nothing)) {
			log.trace("no output class; no constraints to violate");
			return false;
		}
		Collection<Restriction> restrictions = new ArrayList<Restriction>();
		for (Restriction r: OwlUtils.listRestrictions(outputClass, inputClass)) {
			OntProperty attachedProperty = r.getOnProperty();
			if (p.equals(attachedProperty) || p.hasSubProperty(attachedProperty, false)) {
				restrictions.add(r);
			}
		}

		for (Triple constraint: variableConstraints) {
			if (constraintViolatesRestrictions(constraint, restrictions))
				return true;
		}
		return false;

//		Resource dummy = OwlUtils.createDummyInstance(outputClass);
//		for (Triple constraint: variableConstraints) {
//			if (!constraint.getPredicate().isURI())
//				continue;
//			Property p = RdfUtils.getProperty(dataModel, constraint.getPredicate());
//			if (p.equals(RDF.type)) {
//				OntClass constrainedType = getNodeAsClass(constraint.getObject());
//				if (constrainedType == null) {
//					log.trace(String.format("skipping type constraint with unknown class %s", constraint.getObject()));
//					continue;
//				}
//				try {
//					constrainedType.getOntModel().addSubModel(dummy.getModel());
//					if (!constrainedType.listInstances().toSet().contains(dummy)) {
//						log.trace(String.format("service will not provide an instance of %s", constrainedType));
//						return true;
//					}
//				} finally {
//					constrainedType.getOntModel().removeSubModel(dummy.getModel());
//				}
//			} else {
//				RDFNode o = RdfUtils.getRDFNode(dataModel, constraint.getObject());
//				if (!dummy.hasProperty(p, o)) {
//					log.trace(String.format("service will not provide constrained value %s of property %s", o, p));
//					return true;
//				}
//			}
//			if (constraint.getPredicate().equals(RDF.type.asNode())) {
//				OntClass constrainedType = getNodeAsClass(constraint.getObject());
//				if (constrainedType == null) {
//					log.trace(String.format("skipping type constraint with unknown class %s", constraint.getObject()));
//					continue;
//				}
//				for (Restriction r: restrictions) {
//					if (!(r.isAllValuesFromRestriction() || r.isSomeValuesFromRestriction()))
//						continue;
//					OntClass valuesFrom = OwlUtils.getValuesFromAsClass(r);
//					if (valuesFrom != null && !constrainedType.hasSubClass(valuesFrom)) {
//						log.trace(String.format("constrained type %s is not a subClass of valuesFrom class %s", constrainedType, valuesFrom));
//						return true;
//					}
//				}
//			} else {
//				for (Restriction r: restrictions) {
//					if (r.isHasValueRestriction()) {
//						RDFNode hasValue = r.asHasValueRestriction().getHasValue();
//						if (!hasValue.equals(constraint.getObject())) {
//							log.trace(String.format("concrete variable constraint %s violates hasValue restriction %s", constraint.getObject(), hasValue));
//							return true;
//						}
//					}
//				}
//			}
//		}
//		return false;
	}

	private boolean constraintViolatesRestrictions(Triple constraint,
			Collection<Restriction> restrictions) {
		if (constraint.getPredicate().equals(RDF.type.asNode())) {
			OntClass constrainedType = getNodeAsClass(constraint.getObject());
			if (constrainedType == null) {
				log.trace(String.format("skipping type constraint with unknown class %s", constraint.getObject()));
				return false;
			}
			boolean conflict = false;
			for (Restriction r: restrictions) {
				OntClass attachedValuesFrom = OwlUtils.getValuesFromAsClass(r);
				if (attachedValuesFrom != null) {
					if (classesOverlap(constrainedType, attachedValuesFrom)) { // hasSuperClass because we might be building up; this is non-ideal...
						if (log.isDebugEnabled())
							log.debug(String.format("service attaches values of type %s which is compatible with type %s", LabelUtils.getLabel(attachedValuesFrom), LabelUtils.getLabel(constrainedType)));
						return false;
					} else {
						if (log.isDebugEnabled())
							log.debug(String.format("service attaches values of type %s and we're looking for values of type %s", LabelUtils.getLabel(attachedValuesFrom), LabelUtils.getLabel(constrainedType)));
						conflict = true;
					}
				}
			}
			return conflict;
		} else {
			return false;
		}
	}

	private static boolean classesOverlap(OntClass constrainedType, OntClass attachedValuesFrom)
	{
		if (constrainedType.isIntersectionClass()) {
			for (Iterator<? extends OntClass> i = constrainedType.asIntersectionClass().listOperands(); i.hasNext(); ) {
				if (classesOverlap(i.next(), attachedValuesFrom))
					return true;
			}
			return false;
		} else if (attachedValuesFrom.isIntersectionClass()) {
			for (Iterator<? extends OntClass> i = attachedValuesFrom.asIntersectionClass().listOperands(); i.hasNext(); ) {
				if (classesOverlap(constrainedType, i.next()))
					return true;
			}
			return false;
		} else if (constrainedType.isUnionClass()) {
			for (Iterator<? extends OntClass> i = constrainedType.asUnionClass().listOperands(); i.hasNext(); ) {
				if (classesOverlap(i.next(), attachedValuesFrom))
					return true;
			}
			return false;
		} else if (attachedValuesFrom.isUnionClass()) {
			for (Iterator<? extends OntClass> i = attachedValuesFrom.asUnionClass().listOperands(); i.hasNext(); ) {
				if (classesOverlap(constrainedType, i.next()))
					return true;
			}
			return false;
		} else {
			return constrainedType.equals(attachedValuesFrom) || constrainedType.hasSubClass(attachedValuesFrom) || constrainedType.hasSuperClass(attachedValuesFrom);
		}
	}

	/**
	 * Add the given triple in the data model, as well as any triples
	 * that can be inferred from equivalent and inverse property relationships.
	 * This method does not yet store inferences derived from subproperty
	 * relationships.
	 *
	 * @param triple The triple to add to the data model.
	 */
	protected void addTripleWithInferences(Triple triple)
	{
		// TODO: Add triples implied by subproperty relationships.
		// TODO: Don't rely on OwlUtils.getEquivalentProperties() and
		// SHAREKnowledgeBase.getInverseProperty(); these methods
		// assume that inverseOf/equivalentProperty relationships are
		// asserted in both directions, or that we are using
		// a model with inferencing.

		log.trace(String.format("adding triple %s and its inferences to the data model", triple));

		Resource s = RdfUtils.getResource(dataModel, triple.getSubject());
		OntProperty p = getOntProperty(triple.getPredicate().getURI());
		RDFNode o = RdfUtils.getRDFNode(dataModel, triple.getObject());

		for(OntProperty equivalentProperty : OwlUtils.getEquivalentProperties(p)) {
			log.trace(String.format("adding triple to data model: (%s, %s, %s)", s, equivalentProperty, o));
			dataModel.add(s, equivalentProperty, o);
		}

		if(p.isObjectProperty() && o.isResource()) {
			OntProperty inverse = getInverseProperty(p);
			for(OntProperty equivalentProperty : OwlUtils.getEquivalentProperties(inverse)) {
				// don't record triples that uses fake inverse properties
				if(equivalentProperty.getURI().endsWith(AUTOGENERATED_INVERSE_PROPERTY_SUFFIX)) {
					continue;
				}
				log.trace(String.format("adding triple to data model: (%s, %s, %s)", o.asResource(), equivalentProperty, s));
				dataModel.add(o.asResource(), equivalentProperty, s);
			}
		}
	}

	protected void populateVariableBindingWithIntersection(PotentialValues subjects, PotentialValues predicates, PotentialValues objects)
	{
		boolean sIsBoundVar = subjects.isVariable() && !subjects.isEmpty();
		boolean pIsBoundVar = predicates.isVariable() && !predicates.isEmpty();
		boolean oIsBoundVar = objects.isVariable() && !objects.isEmpty();

		Set<RDFNode> sValues = new HashSet<RDFNode>();
		Set<RDFNode> pValues = new HashSet<RDFNode>();
		Set<RDFNode> oValues = new HashSet<RDFNode>();

		for (Statement statement : getStatements(subjects, predicates, objects))
		{
			sValues.add(statement.getSubject());
			pValues.add(statement.getPredicate());
			oValues.add(statement.getObject());
		}

		if (subjects.isVariable()) {
			if (sIsBoundVar) {
				log.trace(String.format("pattern has %d solutions for %s that match existing bindings for %s (%s has %d existing bindings)",
						sValues.size(),
						subjects.variable,
						subjects.variable,
						subjects.variable,
						subjects.values.size()));

			}
			log.trace(String.format("assigning %d bindings to variable %s", sValues.size(), subjects.variable));
			subjects.setBindings(sValues);
		}

		if (predicates.isVariable()) {
			if (pIsBoundVar) {
				log.trace(String.format("pattern has %d solutions for %s that match existing bindings for %s (%s has %d existing bindings)",
						pValues.size(),
						predicates.variable,
						predicates.variable,
						predicates.variable,
						predicates.values.size()));
			}
			log.trace(String.format("assigning %d bindings to variable %s", pValues.size(), predicates.variable));
			predicates.setBindings(pValues);
		}

		if (objects.isVariable()) {
			if (oIsBoundVar) {
				log.trace(String.format("pattern has %d solutions for %s that match existing bindings for %s (%s has %d existing bindings)",
						oValues.size(),
						objects.variable,
						objects.variable,
						objects.variable,
						objects.values.size()));

			}

			log.trace(String.format("assigning %d bindings to variable %s", oValues.size(), objects.variable));
			objects.setBindings(oValues);
		}
	}

	protected void populateVariableBinding(PotentialValues subjects, PotentialValues predicates, PotentialValues objects)
	{
		boolean sIsUnboundVar = subjects.isEmpty();
		boolean pIsUnboundVar = predicates.isEmpty();
		boolean oIsUnboundVar = objects.isEmpty();

		for(Statement statement : getStatements(subjects, predicates, objects))
		{
			if (sIsUnboundVar) {
				subjects.add(statement.getSubject());
			}
			if (pIsUnboundVar) {
				predicates.add(statement.getPredicate());
			}
			if (oIsUnboundVar) {
				objects.add(statement.getObject());
			}
		}

		if (sIsUnboundVar) {
			log.trace(String.format("assigned %d bindings to variable %s", subjects.values.size(), subjects.variable));
		}
		if (pIsUnboundVar) {
			log.trace(String.format("assigned %d bindings to variable %s", predicates.values.size(), predicates.variable));
		}
		if (oIsUnboundVar) {
			log.trace(String.format("assigned %d bindings to variable %s", objects.values.size(), objects.variable));
		}
	}

	protected void recordStats(PotentialValues subjects, PotentialValues predicates, PotentialValues objects, boolean directionIsForward, int responseTime)
	{
		if (getStatsDB() == null) {
			log.error("statsDB was not successfully initialized, skipping recording of stats");
			return;
		}

		if ((directionIsForward && subjects.isEmpty()) || (!directionIsForward && objects.isEmpty())) {
			log.error("non-sensical value for directionIsForward, skipping recording of stats");
			return;
		}

		Set<OntProperty> properties = getOntProperties(RdfUtils.extractResources(predicates.values));

		if (!predicates.isEmpty() && properties.size() == 0) {
			log.trace("predicate has no bindings that are URIs, skipping recording of stats");
			return;
		}

		/*
		 * If the predicate of the pattern is an unbound variable, or
		 * is a variable with multiple bindings, then we can't record
		 * any stats.  (We don't have enough information to determine
		 * the resolution time of the individual predicates.)
		 */

		if (predicates.isEmpty() || properties.size() > 1) {
			log.trace("pattern involves multiple predicates, skipping recording of stats");
			return;
		}

		/*
		 * If the pattern has no solutions, we assume that this was
		 * due to an user error in formulating the query, and do
		 * not record any stats.
		 */
		if (getStatements(subjects, predicates, objects).size() == 0) {
			log.trace("pattern has no solutions, skipping recording of stats");
			return;
		}

		OntProperty property = properties.iterator().next();
		int numInputs = directionIsForward ? subjects.values.size() : objects.values.size();

		for (OntProperty p : OwlUtils.getEquivalentProperties(property)) {
			getStatsDB().recordSample(p, directionIsForward, numInputs, responseTime);
		}

		for (OntProperty inverse : getInverseProperties(property)) {
			if (!inverse.getURI().endsWith(AUTOGENERATED_INVERSE_PROPERTY_SUFFIX)) {
				getStatsDB().recordSample(inverse, !directionIsForward, numInputs, responseTime);
			}
		}
	}

	protected Collection<Statement> getStatements(PotentialValues subjects, PotentialValues predicates, PotentialValues objects)
	{
		Collection<Statement> statements = new ArrayList<Statement>();

		boolean sIsUnboundVar = subjects.isEmpty();
		boolean pIsUnboundVar = predicates.isEmpty();
		boolean oIsUnboundVar = objects.isEmpty();

		// null represents a wildcard in call to reasoningModel.listStatements() below

		Collection<Resource> subjectSet = Collections.singleton((Resource)null);
		Set<? extends Property> predicateSet = Collections.singleton((Property) null);
		Set<RDFNode> objectSet = Collections.singleton((RDFNode) null);

		if(!sIsUnboundVar) {
			subjectSet = RdfUtils.extractResources(subjects.values);
		}
		if(!pIsUnboundVar) {
			predicateSet = getOntProperties(RdfUtils.extractResources(predicates.values));
		}
		if(!oIsUnboundVar) {
			objectSet = objects.values;
		}

		for (Resource s : subjectSet) {
			for (Property p : predicateSet) {
				for (RDFNode o : objectSet) {
					statements.addAll(reasoningModel.listStatements(s, p, o).toList());
				}
			}
		}

		return statements;
	}

//	Set<String> skipInputClasses = new HashSet<String>();
	private Model maybeCallService(Service service, Set<RDFNode> subjects)
	{
		log.trace(String.format("found service %s", service));

		if (deadServices.contains(service.getURI())) {
			log.debug(String.format("skipping dead service %s", service));
			return ModelFactory.createDefaultModel();
		}

		log.trace(String.format("filtering inputs previously sent to service %s", service));
		for (Iterator<? extends RDFNode> i = subjects.iterator(); i.hasNext(); ) {
			RDFNode input = i.next();
			if (tracker.beenThere(service, input, false)) {
				log.trace(String.format("skipping input %s (been there)", input));
				i.remove();
			}
		}
		if (subjects.isEmpty()) {
			log.trace("nothing left to do");
			return ModelFactory.createDefaultModel();
		}

//		String inputClassURI = service.getInputClassURI();
//		if (inputClassURI != null && !inputClassURI.equals(OWL.Nothing.getURI()) && skipInputClasses.contains(inputClassURI)) {
//			log.trace(String.format("skipping input class %s because we know we have no instances", inputClassURI));
//			return Collections.emptyList();
//		}
		filterByInputClass(subjects, service);
		if (subjects.isEmpty()) {
			log.trace("nothing left to do");
//			skipInputClasses.add(inputClassURI);
			return ModelFactory.createDefaultModel();
		}

		/* TODO filter by output class, too:
		 * look at additional constraints on the subject variable
		 * iterate through restrictions
		 * 	valuesFrom restriction?
		 * 		remove service if there's a type constraint for an incompatible type
		 *  hasValue restriction?
		 *  	remove service if there's a conflicting value constraint
		 */

		return invokeService(service, subjects);
	}

	/* TODO make this less clunky; probably the service interface should take
	 * the OntProperty instead of just the URI, then it can return services
	 * that match the synonyms as well...
	 */
	private Set<Service> getServicesByPredicate(Collection<OntProperty> predicates, Collection<Triple> variableConstraints)
	{
		Set<OntProperty> equivalentProperties = new HashSet<OntProperty>();
		for(OntProperty predicate : predicates) {
			equivalentProperties.addAll(OwlUtils.getEquivalentProperties(predicate, true));
		}

		Set<Service> services = new HashSet<Service>();
		for (OntProperty equivalentProperty: equivalentProperties) {
			log.trace(String.format("finding services for equivalent property %s", resourceToString(equivalentProperty)));
//			if (equivalentProperty.equals(RDF.type) || equivalentProperty.isInverseOf(RDF.type)) {
//				log.debug("refusing to find services for RDF.type or its inverse");
//				continue;
//			}
			Collection<? extends Service> equivalentPropertyServices;
			try {
				equivalentPropertyServices = getRegistry().findServicesByAttachedProperty(equivalentProperty);
			} catch (SADIException e) {
				log.error(e.getMessage(), e);
				equivalentPropertyServices = Collections.emptyList();
			}
			for (Service service: equivalentPropertyServices) {
				if (violatesConstraints(service, equivalentProperty, variableConstraints)) {
					log.debug(String.format("excluding service %s because it violates constraints on variable", service));
				} else {
					log.trace(String.format("found service %s for equivalent property %s", service, equivalentProperty));
					services.add(service);
				}
			}
//			log.debug(String.format("found %d service%s for property %s", equivalentPropertyServices.size(), equivalentPropertyServices.size() == 1 ? "" : "s", resourceToString(equivalentProperty)));
//			services.addAll(equivalentPropertyServices);
		}

		return services;
	}

	protected Set<OntProperty> getInverseProperties(OntProperty p)
	{
		log.trace(String.format("finding all properties inverse to %s", p));

		Set<OntProperty> inverseProperties = new HashSet<OntProperty>();
		for (OntProperty q: p.listInverse().toList()) {
			log.trace(String.format("found inverse property %s", q));
			inverseProperties.add(q);
		}

		return inverseProperties;
	}

	/* TODO fix this redundancy by making it so that Services can return
	 * null for getInputClass (in which case, we'll call isInputInstance
	 * on the collection), or an OntClass (in which case, we'll load it
	 * in our own ontology and check in bulk...)
	 * allow isInputInstance on collection without be cumbersome by
	 * implementing AbstractService, which others can extend to implement
	 * the multiple methods of the interface without all the tedium...
	 */
	private void filterByInputClass(Set<RDFNode> subjects, Service service)
	{
		log.trace(String.format("filtering inputs to service %s by input class", service));

		/* keep an unfiltered copy around so we can try using SADI to try and
		 * satisfy the input class description (if so configured...)
		 */
		Set<RDFNode> unfiltered = new HashSet<RDFNode>(subjects.size());

		filterByInputClassInBulk(subjects, service);

		/* if so configured, use SADI to attempt to dynamically attach
		 * properties to the failed nodes so that they will pass...
		 */
		if (dynamicInputInstanceClassification) {
			OntClass inputClass = null;
			try {
				inputClass = service.getInputClass();
			} catch (SADIException e) {
				log.error(String.format("error loading input class for service %s", service), e);
			}
			if (inputClass != null) {
				log.trace(String.format("using SADI to test membership in %s", inputClass));

				PotentialValues s = getNewVariableBinding(null);
				for (Iterator<RDFNode> i = unfiltered.iterator(); i.hasNext(); ) {
					RDFNode node = i.next();
					if (!subjects.contains(node))
						s.add(node);
				}
				Triple typePattern = Triple.create(s.variable, RDF.type.asNode(), inputClass.asNode());
				processPattern(typePattern);
			}
		}
	}

	private void filterByInputClassInBulk(Set<RDFNode> subjects, Service service)
	{
		/* I really, really hate this special case coding, but if we want
		 * to be able to use literals as input to the SPARQL services
		 * (even though the SADI spec explicitly disallows this), we have
		 * to do it this way...
		 */
//		if (inputClass == null || inputClass.equals(OWL.Nothing)) {
		if (service.getInputClassURI() == null ||
			service.getInputClassURI().equals("http://www.w3.org/2002/07/owl#Nothing")) {
			filterByInputClassIndividually(subjects, service);
			return;
		}

		log.trace(String.format("finding instances of %s", service.getInputClassURI()));
		OntClass inputClass = reasoningModel.getOntClass(service.getInputClassURI());
		Set<String> instanceURIs = new HashSet<String>();

		/* if the service input class is in our reasoning model already,
		 * the user has explicitly imported it; in this case, there's no need
		 * to go through the whole segregation rigamarole...
		 */
		if (inputClass != null) {
			for (OntResource r: inputClass.listInstances().toList()) {
				instanceURIs.add(r.getURI());
			}
		} else {
			try {
				inputClass = service.getInputClass();
			} catch (SADIException e) {
				log.error(String.format("error loading input class for service %s; skipping service", service), e);
				subjects.clear();
				return;
			}

			/* TODO this will cause a problem if different ontologies accessed in
			 * the same query have conflicting definitions; we'll need a
			 * really descriptive error message for when this happens...
			 */
			reasoningModel.addSubModel(inputClass.getOntModel());
			OntClass inOurModel = reasoningModel.getOntClass(inputClass.getURI());
			if (inOurModel == null) {
				log.warn(String.format("service %s input class %s is undefined", service, inputClass));
			} else {
				for (OntResource r: inOurModel.listInstances().toList()) {
					instanceURIs.add(r.getURI());
				}
			}
			reasoningModel.removeSubModel(inputClass.getOntModel());
		}
		for (Iterator<? extends RDFNode> i = subjects.iterator(); i.hasNext(); ) {
			RDFNode node = i.next();
			if (node.isResource() && instanceURIs.contains(node.as(Resource.class).getURI())) {
				log.trace(String.format("%s is a valid input to %s", node, service));
			} else {
				log.trace(String.format("%s is an invalid input to %s", node, service));
				i.remove();
			}
		}
	}

	private void filterByInputClassIndividually(Set<RDFNode> subjects, Service service)
	{
		for (Iterator<? extends RDFNode> i = subjects.iterator(); i.hasNext(); ) {
			RDFNode node = i.next();
			log.trace(String.format("checking if %s is a valid input to %s", node, service));
			if (isValidInput(node, service)) {
				log.trace(String.format("%s is a valid input to %s", node, service));
			} else {
				log.trace(String.format("%s is not a valid input to %s", node, service));
				i.remove();
			}
		}
	}

	private boolean isValidInput(RDFNode input, Service service)
	{
		/* I really, really hate this special case coding, but if we want
		 * to be able to use literals as input to the SPARQL services
		 * (even though the SADI spec explicitly disallows this), we have
		 * to do it this way...
		 */
		if (input.isResource()) {
			try {
				return service.isInputInstance(input.as(Resource.class));
			} catch (SADIException e) {
				log.error("error in isInputInstance", e);
				return false;
			}
		} else if (input.isLiteral() && service instanceof SPARQLServiceWrapper) {
			// a literal is only allowed as input if it is an "inverted" SPARQL service
			return ((SPARQLServiceWrapper)service).mapInputsToObjectPosition();
		} else {
			return false;
		}
	}

	private Model invokeService(Service service, Set<? extends RDFNode> inputs)
	{
		log.info(getServiceCallString(service, inputs));
		try {
			Model output = ModelFactory.createDefaultModel();

			StopWatch stopWatch = new StopWatch();
			stopWatch.start();

			/* see above about special-case coding...
			 */
			if (service instanceof SPARQLServiceWrapper) {
				output = ((SPARQLServiceWrapper)service).invokeServiceOnRDFNodes(inputs.iterator(), output);
			} else {
				/* generate a list of the OntModel views of each resource
				 * so that the minimal model extractor can used inferred
				 * properties...
				 */
				Collection<Resource> inputResources = new ArrayList<Resource>(inputs.size());
				for (RDFNode input: inputs) {
					/* only resources should be in the collection by now, but
					 * let's be safe...
					 */
					if (!input.isResource())
						continue;

					Resource inputResource = input.inModel(reasoningModel).as(Resource.class);
					inputResources.add(inputResource);

					/* explicitly attach the input type of the service, since
					 * we know it's been dynamically classified as such...
					 * TODO this shouldn't be necessary and is probably a bug
					 */
					try {
						inputResource.addProperty(RDF.type, service.getInputClass());
					} catch (SADIException e) {
						log.error(String.format("error loading input class for service %s", service), e);
					}
				}
				for (Resource inputResource: inputResources) {
					tracker.beenThere(service, inputResource);
				}
				output = service.invokeService(inputResources);
			}

			stopWatch.stop();
			log.trace(String.format("invocation of service %s completed in %dms", service.getURI(), stopWatch.getTime()));

//			/* remove output class from output triples;
//			 * TODO move this to client API...
//			 */
//			for (Iterator<Triple> triples = output.iterator(); triples.hasNext(); ) {
//				Triple triple = triples.next();
//				if (RDF.type.equals(triple.getPredicate().getURI()) &&
//					triple.getObject().isURI() &&
//					triple.getObject().getURI().equals(service.getOutputClassURI())) {
//
//					triples.remove();
//				}
//			}

			return output;
		} catch (ServiceInvocationException e) {
			log.error(String.format("failed to invoke service %s", service), e);

			if (e.isServiceDead()) {
				String serviceURI = service.getURI();
				log.warn(String.format("adding %s to dead services", serviceURI));
				deadServices.add(serviceURI);
			}

			return ModelFactory.createDefaultModel();
		}
	}

	private String getServiceCallString(Service service, Collection<? extends RDFNode> inputs)
	{
//		return String.format("calling %s %s (%s)", service.getClass().getSimpleName(), service, inputs);
		return String.format("calling service %s (%s)", service.getName(), inputs);
	}

	/* I'm not 100% sure we don't have to care exactly how we encounter a
	 * particular query pattern, but I suspect worrying about it now would
	 * fall under the heading of premature optimization...
	 */
	private static class QueryPatternEnumerator extends ElementVisitorBase
	{
		List<Triple> queryPatterns;

		public QueryPatternEnumerator(Query query)
		{
			queryPatterns = new ArrayList<Triple>();
			ElementWalker.walk(query.getQueryPattern(), this);
		}

		public List<Triple> getQueryPatterns()
		{
			return queryPatterns;
		}

		public void visit(ElementTriplesBlock el)
		{
			for (Iterator<Triple> i = el.patternElts(); i.hasNext(); ) {
				queryPatterns.add((Triple)i.next());
			}
		}

		/*
		 * If the ARQ extensions to SPARQL are turned on (i.e. if the variable "share.sparql.useARQSyntax"
		 * not set to false in the share.properties file), then a basic triple pattern consists of list of
		 * "TriplePaths" rather than Triples.  TriplePaths are like Triples, but they allow XPath-like
		 * expressions in the predicate position, as described at http://jena.sourceforge.net/ARQ/property_paths.html.
		 * -- BV
		 */
		public void visit(ElementPathBlock el) {
			for (TriplePath triplePath : el.getPattern()) {
				if (triplePath.asTriple() == null) {
					log.error(String.format("this version of SHARE does not support ARQ TriplePaths, ignoring TriplePath: %s", triplePath));
					continue;
				}
				queryPatterns.add(triplePath.asTriple());
			}
		}
	}

	private static class PotentialValues
	{
		Node variable;
		Set<RDFNode> values;
		String key;

		public PotentialValues(Node variable)
		{
			this.variable = variable;
			values = new HashSet<RDFNode>();
			key = "?" + variable.getName();
		}

		public PotentialValues(RDFNode value)
		{
			this.variable = null;
			values = new HashSet<RDFNode>();
			values.add(value);
			key = value.toString();
		}

		public boolean isVariable()
		{
			return variable != null;
		}

		public boolean isEmpty()
		{
			return values.isEmpty();
		}

		public void add(RDFNode node)
		{
			log.trace(String.format("adding %s to variable %s", node, variable));
			values.add(node);
		}

		public void setBindings(Collection<RDFNode> bindings)
		{
			log.trace(String.format("clearing bindings for variable %s", variable));
			values.clear();
			for(RDFNode binding : bindings) {
				add(binding);
			}
		}

		public String toString()
		{
			if (isVariable()) {
				return String.format("?%s %s", variable.getName(), values);
			} else {
				return values.toString();
			}
		}
	}

	private static class Tracker
	{
		private Set<String> visited;

		public Tracker()
		{
			visited = new HashSet<String>();
		}

		public synchronized boolean beenThere(Service service, RDFNode input)
		{
			return beenThere(service, input, true);
		}

		public synchronized boolean beenThere(Service service, RDFNode input, boolean update)
		{
			String key = getHashKey(service, input);
			if (visited.contains(key)) {
				return true;
			} else {
				if (update) {
					log.trace(String.format("tracking (%s, %s)", service, input));
					visited.add(key);
				}
				return false;
			}
		}

		public synchronized boolean beenThere(PotentialValues instances, OntClass asClass)
		{
			String key = getHashKey(instances, asClass);
			if (visited.contains(key)) {
				return true;
			} else {
				log.trace(String.format("tracking (%s, %s)", instances, asClass));
				visited.add(key);
				return false;
			}
		}

		private String getHashKey(Service service, RDFNode input)
		{
			StringBuilder builder = new StringBuilder();

			builder.append(service.getURI());

			// more special casing: must differentiate between
			// inverted and non-inverted sparql services
			if(service instanceof SPARQLServiceWrapper) {
				builder.append("(inverted=");
				builder.append(String.valueOf(((SPARQLServiceWrapper)service).mapInputsToObjectPosition()));
				builder.append(")");
			}

			builder.append(input.toString());

			return builder.toString();
		}

		private String getHashKey(PotentialValues instances, OntClass asClass)
		{
			// one variable name or URI, one node ID or URI, so this should be safe...
			return String.format("%s %s", instances.key, asClass.toString());
		}
	}

	/*
	 * Rankings for expensive/unresolvable query patterns.
	 * -1 is reserved for PredicateStatsDB.NO_STATS_AVAILABLE.
	 */

	protected static int COST_EXPENSIVE = -2;
	protected static int COST_UNRESOLVABLE = -3;

	protected class QueryPatternComparator implements Comparator<Triple>
	{
		private PropertyResolvabilityCache resolvabilityCache = new PropertyResolvabilityCache(getRegistry());

		public int compare(Triple pattern1, Triple pattern2)
		{

			int cost1 = costByStats(pattern1);
			int cost2 = costByStats(pattern2);

			/*
			 * If no stats were available for one or both patterns, compare them by
			 * the number of inputs that will be sent to matching services.
			 */

			if(cost1 == PredicateStatsDB.NO_STATS_AVAILABLE || cost2 == PredicateStatsDB.NO_STATS_AVAILABLE) {

				cost1 = costByBindings(pattern1);
				cost2 = costByBindings(pattern2);

			}

			return compare(cost1, cost2);
		}

		protected int compare(int cost1, int cost2)
		{
			/* Note: the first case handles when both numbers are both BAD, both WORST, etc. */

			if (cost1 == cost2) {
				return 0;
			} else if(cost1 > 0 && cost2 < 0) {
				return -1;
			} else if(cost1 < 0 && cost2 > 0) {
				return 1;
			} else {

				/* CASE: both costs have the same sign */
				if (Math.abs(cost1) < Math.abs(cost2)) {
					return -1;
				} else if(Math.abs(cost1) > Math.abs(cost2)) {
					return 1;
				} else {
					return 0;
				}
			}
		}

		protected int costByStats(Triple pattern)
		{

			if(!isResolvable(pattern)) {
				return COST_UNRESOLVABLE;
			}

			PotentialValues s = expandQueryNode(pattern.getSubject());
			PotentialValues p = expandQueryNode(pattern.getPredicate());
			PotentialValues o = expandQueryNode(pattern.getObject());

			if (s.isEmpty() && o.isEmpty()) {

				/* CASES: (?s, ?p, ?o), (?s, bound, ?o)
				 *
				 * These cases are included here for completeness, but they
				 * should be caught by the isResolvable check above.
				 */
				return COST_UNRESOLVABLE;

			} else if (p.isEmpty()) {

				/* CASES: (bound, ?p, ?o), (?s, ?p, bound), (bound, ?p, bound) */
				return COST_EXPENSIVE;

			} else {

				/* CASES: (bound, bound, ?o), (?s, bound, bound), (bound, bound, bound) */

				Set<OntProperty> properties = getOntProperties(RdfUtils.extractResources(p.values));

				/* predicate has no bindings that are URIs */
				if(properties.size() == 0) {
					return 0;
				}

				if(!s.isEmpty() && !o.isEmpty()) {

					/* CASE: (bound, bound, bound) */
					int forwardCost = costByStats(properties, true, s.values.size());
					int reverseCost = costByStats(properties, false, o.values.size());

					if(compare(forwardCost, reverseCost) <= 0) {
						return forwardCost;
					} else {
						return reverseCost;
					}

				}
				else if(!s.isEmpty()) {

					/* CASE: (bound, bound, ?o) */

					return costByStats(properties, true, s.values.size());

				} else {

					/* CASE: (?s, bound, bound) */

					return costByStats(properties, false, o.values.size());
				}

			}
		}

		protected int costByStats(Set<OntProperty> predicates, boolean directionIsForward, int numInputs)
		{
			if(predicates.size() == 0) {
				return 0;
			}

			if (getStatsDB() == null) {
				log.error("stats DB was not correctly initialized, returning NO_STATS_AVAILABLE for time estimate");
				return PredicateStatsDB.NO_STATS_AVAILABLE;
			}

			/*
			 * Once we have added in the cost for resolving a property, ignore
			 * the costs of any equivalent properties (since these
			 * will resolve to the same services).
			 */

			List<OntProperty> visitedPredicates = new ArrayList<OntProperty>(predicates.size());

			int totalCost = 0;

			for(OntProperty predicate : predicates) {

				boolean skipThisPredicate = false;;

				for(OntProperty visitedPredicate : visitedPredicates) {
					if(equivalent(predicate, visitedPredicate)) {
						skipThisPredicate = true;
						break;
					}
				}

				if(skipThisPredicate) {
					continue;
				}

				int cost = getStatsDB().getEstimatedTime(predicate, directionIsForward, numInputs);

				/*
				 * If even one of the predicates has no stats available,
				 * then we can't estimate the overall cost.
				 */

				if(cost == PredicateStatsDB.NO_STATS_AVAILABLE) {
					return PredicateStatsDB.NO_STATS_AVAILABLE;
				}

				visitedPredicates.add(predicate);
				totalCost += cost;

			}

			return totalCost;
		}

		protected boolean equivalent(OntProperty p1, OntProperty p2)
		{
			for(OntProperty equivalentProperty : p1.listEquivalentProperties().toList()) {
				if(equivalentProperty.getURI().equals(p2.getURI())) {
					return true;
				}
			}
			return false;
		}

		protected int costByBindings(Triple pattern)
		{

			if(!isResolvable(pattern)) {
				return COST_UNRESOLVABLE;
			}

			PotentialValues s = expandQueryNode(pattern.getSubject());
			PotentialValues o = expandQueryNode(pattern.getObject());

			if(s.isEmpty() && o.isEmpty()) {

				/*
				 * CASES: (?s, ?p, ?o), (?s, bound, ?o)
				 *
				 * These cases are included here for completeness, but they
				 * should be caught by the isResolvable check above.
				 */
				return COST_UNRESOLVABLE;

			} else if(!s.isEmpty() && !o.isEmpty()) {

				return Math.min(s.values.size(), o.values.size());

			} else if(!s.isEmpty()) {

				return s.values.size();

			} else {

				return o.values.size();

			}
		}

		protected boolean bestDirectionIsForward(Triple pattern)
		{

			/*
			 * If both the forward and reverse directions are unresolvable, then
			 * it doesn't matter what value we return.
			 */
			if(!isResolvable(pattern)) {
				return true;
			}

			PotentialValues s = expandQueryNode(pattern.getSubject());
			PotentialValues p = expandQueryNode(pattern.getPredicate());
			PotentialValues o = expandQueryNode(pattern.getObject());

			if(s.isEmpty() || o.isEmpty()) {
				throw new RuntimeException("expected both subject and object positions to be bound");
			}

			Set<OntProperty> properties = getOntProperties(RdfUtils.extractResources(p.values));

			/*
			 * CASE: The predicate is a variable with bindings, but none of the
			 * bindings are URIs.  It doesn't matter which direction we choose,
			 * because the pattern has no solutions.
			 */

			if(!p.isEmpty() && properties.size() == 0) {
				return true;
			}

			int forwardCost;
			int reverseCost;

			/*
			 * CASE: The predicate is an unbound variable, so we can't
			 * make use of our predicate-based stats. Instead, compare the
			 * number of bindings for the subject and object.
			 */

			if(p.isEmpty()) {

				forwardCost = s.values.size();
				reverseCost = o.values.size();

			} else {

				Collection<OntProperty> inverseProperties = getInverseProperties(properties);

				if(resolvabilityCache.isResolvable(properties) && !resolvabilityCache.isResolvable(inverseProperties)) {

					return true;

				} else if (!resolvabilityCache.isResolvable(properties) && resolvabilityCache.isResolvable(inverseProperties)) {

					return false;

				}

				forwardCost = costByStats(properties, true, s.values.size());
				reverseCost = costByStats(properties, false, o.values.size());

				/*
				 * If no stats were available for one or both patterns, compare them by
				 * the number of inputs that will be sent to matching services.
				 */

				if(forwardCost == PredicateStatsDB.NO_STATS_AVAILABLE || reverseCost == PredicateStatsDB.NO_STATS_AVAILABLE) {

					forwardCost = s.values.size();
					reverseCost = o.values.size();

				}

			}

			return (compare(forwardCost, reverseCost) <= 0);
		}

		/**
		 * Return true if this pattern can be mapped to at least one web service
		 * in the forward or reverse direction.
		 *
		 * @param pattern
		 * @return true if the pattern is web-service-resolvable
		 */
		protected boolean isResolvable(Triple pattern)
		{

			PotentialValues s = expandQueryNode(pattern.getSubject());
			PotentialValues p = expandQueryNode(pattern.getPredicate());
			PotentialValues o = expandQueryNode(pattern.getObject());

			if(p.isEmpty()) {

				/*
				 * CASES: (?s, ?p, ?o), (bound, ?p, ?o), (?s, ?p, bound), (bound, ?p, bound)
				 *
				 * We assume that the bound subject and/or object will be a valid input
				 * for at least one service.  (We have no efficient way of checking this
				 * for sure.)
				 */

				return (!s.isEmpty() || !o.isEmpty());

			} else {

				/* CASES: (?s, bound, ?o), (bound, bound, ?o), (?s, bound, bound) */

				Set<OntProperty> properties = getOntProperties(RdfUtils.extractResources(p.values));

				/* CASE: predicate has no bindings that are URIs
				 *
				 * This indicates that the query will have no solutions.
				 */

				if(properties.size() == 0) {
					return true;
				}

				if(!s.isEmpty()) {

					if(resolvabilityCache.isResolvable(properties)) {
						return true;
					}

				}

				if(!o.isEmpty()) {

					if(resolvabilityCache.isResolvable(getInverseProperties(properties))) {
						return true;
					}

				}

			}

			return false;
		}

	}


}
